\section{Receiving Messages}
\label{Sec::Receive}

The current \textit{behavior} of an actor is its response to the \textit{next} incoming message and includes (a) sending messages to other actors, (b) creation of more actors, and (c) setting a new behavior.

An event-based actor, i.e., the default implementation in \lib, uses \lstinline^become^ to set its behavior.
The given behavior is then executed until it is replaced by another call to \lstinline^become^ or the actor finishes execution.

\subsection{Class-based actors}

A class-based actor is a subtype of \lstinline^event_based_actor^ and must implement the pure virtual member function \lstinline^make_behavior^ returning the initial behavior.

\begin{lstlisting}
class printer : public event_based_actor {
  behavior make_behavior() override {
    return {
      others >> [] {
        cout << to_string(last_dequeued()) << endl;
      }
    };
  }
};
\end{lstlisting}

\clearpage
\begin{lstlisting}
using pop_atom = atom_constant<atom("pop")>;
using push_atom = atom_constant<atom("push")>;

class fixed_stack : public event_based_actor {
public:
  fixed_stack(size_t max) : max_size(max)  {
    full_.assign(
      [=](push_atom, int) {
        // discard
      },
      [=](pop_atom) -> message {
        auto result = data.back();
        data.pop_back();
        become(filled_);
        return make_message(ok_atom::value, result);
      }
    );
    filled_.assign(
      [=](push_atom, int what) {
        data.push_back(what);
        if (data.size() == max_size) become(full_);
      },
      [=](pop_atom) -> message {
        auto result = data.back();
        data.pop_back();
        if (data.empty()) become(empty_);
        return make_message(ok_atom::value, result);
      }
    );
    empty_.assign(
      [=](push_atom, int what) {
        data.push_back(what);
        become(filled_);
      },
      [=](pop_atom) {
        return error_atom::value;
      }
    );
  }

  behavior make_behavior() override {
    return empty_;
  }

  size_t max_size;
  std::vector<int> data;
  behavior full_;
  behavior filled_;
  behavior empty_;
};
\end{lstlisting}

\clearpage
\subsection{Nesting Receives Using \lstinline^become/unbecome^}

Since \lstinline^become^ does not block, an actor has to manipulate its behavior stack to achieve nested receive operations.
An actor can set a new behavior by calling \lstinline^become^ with the \lstinline^keep_behavior^ policy to be able to return to its previous behavior later on by calling \lstinline^unbecome^, as shown in the example below.

\begin{lstlisting}
// receives {int, float} sequences
behavior testee(event_based_actor* self) {
  return {
    [=](int value1) {
      self->become (
        // the keep_behavior policy stores the current behavior
        // on the behavior stack to be able to return to this
        // behavior later on by calling unbecome()
        keep_behavior,
        [=](float value2) {
          cout << value1 << " => " << value2 << endl;
          // restore previous behavior
          self->unbecome();
        }
      );
    }
  };
}
\end{lstlisting}

An event-based actor finishes execution with normal exit reason if the behavior stack is empty after calling \lstinline^unbecome^.
The default policy of \lstinline^become^ is \lstinline^discard_behavior^ that causes an actor to override its current behavior.
The policy flag must be the first argument of \lstinline^become^.

\textbf{Note}: the message handling in \lib is consistent among all actor implementations: unmatched messages are \textit{never} implicitly discarded if no suitable handler was found.
Hence, the order of arrival is not important in the example above.
This is unlike other event-based implementations of the actor model such as Akka for instance.

\clearpage
\subsection{Timeouts}
\label{Sec::Receive::Timeouts}

A behavior set by \lstinline^become^ is invoked whenever a new message arrives.
If no message ever arrives, the actor would wait forever.
This might be desirable if the actor only provides a service and should not do anything else.
But often, we need to be able to recover if an expected message does not arrive within a certain time period. The following examples illustrates the usage of \lstinline^after^ to define a timeout.

\begin{lstlisting}
behavior eager_actor(event_based_actor* self) {
  return {
    [](int i) { /* ... */ },
    [](float i) { /* ... */ },
    others >> [] { /* ... */ },
    after(std::chrono::seconds(10)) >> [] {
      aout(self) << "received nothing within 10 seconds..." << endl;
      // ...
    }
  };
}
\end{lstlisting}

Callbacks given as timeout handler must have zero arguments.
Any number of patterns can precede the timeout definition, but  ``\lstinline^after^'' must always be the final statement.
Using a zero-duration timeout causes the actor to scan its mailbox once and then invoke the timeout immediately if no matching message was found.

\lib supports timeouts using \lstinline^minutes^, \lstinline^seconds^, \lstinline^milliseconds^ and \lstinline^microseconds^.
However, note that the precision depends on the operating system and your local work load.
Thus, you should not depend on a certain clock resolution.

\clearpage
\subsection{Skipping Messages}

Unmatched messages are skipped automatically by \lib's runtime system.
This is true for \textit{all} actor implementations.
To allow actors to skip messages manually, \lstinline^skip_message^ can be used.
This is in particular useful whenever an actor switches between behaviors, but wants to use a default rule created by \lstinline^others^ to catch messages that are not handled by any of its behaviors.

The following example illustrates a simple server actor that dispatches requests to workers.
After receiving an \lstinline^'idle'^ message, it awaits a request that is then forwarded to the idle worker.
Afterwards, the server returns to its initial behavior, i.e., awaits the next \lstinline^'idle'^ message.
The server actor will exit for reason \lstinline^user_defined^ whenever it receives a message that is neither a request, nor an idle message.

\begin{lstlisting}
using idle_atom = atom_constant<atom("idle")>;
using request_atom = atom_constant<atom("request")>;

behavior server(event_based_actor* self) {
  auto die = [=] { self->quit(exit_reason::user_defined); };
  return {
    [=](idle_atom) {
      auto worker = last_sender();
      self->become (
        keep_behavior,
        [=](request_atom) {
          // forward request to idle worker
          self->forward_to(worker);
          // await next idle message
          self->unbecome();
        },
        [=](idle_atom) {
          return skip_message();
        },
        others >> die
      );
    },
    [=](request_atom) {
      return skip_message();
    },
    others >> die
  };
}
\end{lstlisting}
